package com.sun;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 基于Sql抽取数据信息
 */
public class FlinkCdcToSql {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.创建 Flink-MySQL-CDC 的 Source
//        tableEnv.executeSql("CREATE TABLE admin_info (" +
//                " id INT," +
//                " name STRING," +
//                " phone_num STRING" +
//                ") WITH (" +
//                " 'connector' = 'mysql-cdc'," +
//                " 'scan.startup.mode' = 'latest-offset'," +
//                " 'hostname' = '192.168.204.129'," +
//                " 'port' = '3306'," +
//                " 'username' = 'root'," +
//                " 'password' = '123456'," +
//                " 'database-name' = 'hmdp'," +
//                " 'table-name' = 'z_user_info'" +
//                ")");
//
//        tableEnv.executeSql("select * from admin_info").print();


//        tableEnv.executeSql("CREATE TABLE c_building_unit (" +
//                "  id STRING," +
//                "  name STRING," +
//                "  building_id STRING," +
//                "  PRIMARY KEY (id) NOT ENFORCED" +
//                //PRIMARY KEY (id) NOT ENFORCED
//                ") WITH (" +
//                "  'connector' = 'mysql-cdc'," +
//                "  'hostname' = '192.168.204.129'," +
//                "  'port' = '3306'," +
//                "  'username' = 'root'," +
//                "  'password' = '123456'," +
//                "  'database-name' = 'smart_village'," +
//                "  'table-name' = 'c_building_unit'" +
//                ")");


        tableEnv.executeSql("CREATE TABLE dwd_test (id STRING, name STRING, building_id STRING, " +
                "  PRIMARY KEY (id,building_id) NOT ENFORCED" +
                ") WITH (" +
                "  'connector' = 'jdbc'," +
                "  'url' = 'jdbc:mysql://192.168.204.129:3306/hmdp??characterEncoding=utf-8&useSSL=false'," +
                "  'username' = 'root'," +
                "  'password' = '123456'," +
                "  'table-name' = 'dwd_test'" +
                ")");

        env.execute("Flink_CDC_Test");
    }
}
